package ds_industry_2025.ds.ds_01.T1

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}

/*
     2、抽取shtd_store库中sku_info的增量数据进入Hive的ods库中表sku_info。根据ods.sku_info表中create_time作为增量字段，只
     将新增的数据抽入，字段名称、类型不变，同时添加静态分区，分区字段为etl_date，类型为String，且值为当前比赛日的前一天日期（
     分区字段格式为yyyyMMdd）。使用hive cli执行show partitions ods.sku_info命令，将结果截图粘贴至客户端桌面【Release\
     任务B提交结果.docx】中对应的任务序号下；
 */
object t2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("t1")
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtensionW")
      .enableHiveSupport()
      .getOrCreate()

    val conn=new Properties()
    conn.setProperty("user","root")
    conn.setProperty("password","123456")
    conn.setProperty("driver","com.mysql.jdbc.Driver")

    val day=Calendar.getInstance()
    val current_time=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(day.getTime)
    day.add(Calendar.DATE,-1)
    val yesterday=new SimpleDateFormat("yyyyMMdd").format(day.getTime)

    val max_time = spark.table("ods.sku_info")
      .agg(max(col("create_time")))
      .collect()(0).get(0).toString

    spark.read.jdbc("jdbc:mysql://192.168.40.110:3306/shtd_store?useSSL=false","sku_info",conn)
      .where(
        col("create_time") > lit(max_time).cast("timestamp")
      )
      .withColumn("etl_date",lit(yesterday))
      .write.mode("append")
      .partitionBy("etl_date")
      .saveAsTable("ods.sku_info")

    spark.close()
  }

}
